package day02.transform;

import beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Flink 流处理 API - reduce
 *
 * @author lvbingbing
 * @date 2021-12-09 23:59
 */
public class FlinkTransform03 {
    public static void main(String[] args) throws Exception {
        // 1、创建FlinkTransform00对象，有参构造会初始化 env，并从文件中读取数据
        int parallelism = 1;
        FlinkTransform00 flinkTransform = new FlinkTransform00(parallelism);
        // 2、获取执行环境
        StreamExecutionEnvironment env = flinkTransform.getEnv();
        // 3、学习 Reduce
        studyReduce(flinkTransform.getSensorReadingStream());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * reduce()：KeyedStream → DataStream：一个分组数据流的聚合操作，合并当前的元素和上次聚合的结果，产生一个新的值，
     * 返回的流中包含每一次聚合的结果，而不是只返回最后一次聚合的最终结果。
     *
     * @param sensorReadingStream <br>
     */
    private static void studyReduce(DataStream<SensorReading> sensorReadingStream) {
        // 分组
        KeyedStream<SensorReading, String> keyedStream = sensorReadingStream.keyBy(SensorReading::getId);
        // 滚动聚合，获取每个传感器，在截止到某个时间点的最大温度值
        DataStream<SensorReading> reduceStream = keyedStream.reduce((value1, value2) -> new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature())));
        reduceStream.print("reduce");
    }
}
